/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
*/
package org.apache.kylin.storage.hbase.steps;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.kylin.common.util.RandomUtil;

import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kylin.shaded.com.google.common.base.Strings;

/**
 * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788
 *
 * Writes HFiles. Passed Cells must arrive in order.
 * Writes current time as the sequence id for the file. Sets the major compacted
 * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll
 * all HFiles being written.
 * <p>
 * Using this class as part of a MapReduce job is best done
 * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, Cell> {
    static Log LOG = LogFactory.getLog(HFileOutputFormat3.class);

    // The following constants are private since these are used by
    // HFileOutputFormat2 to internally transfer data between job setup and
    // reducer run using conf.
    // These should not be changed by the client.
    private static final String COMPRESSION_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.compression";
    private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
    private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize";
    private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";

    // This constant is public since the client can modify this when setting
    // up their conf object and thus refer to this symbol.
    // It is present for backwards compatibility reasons. Use it only to
    // override the auto-detection of datablock encoding.
    public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding";

    /**
     * Keep locality while generating HFiles for bulkload. See HBASE-12596
     */
    public static final String LOCALITY_SENSITIVE_CONF_KEY = "hbase.bulkload.locality.sensitive.enabled";
    private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
    private static final String OUTPUT_TABLE_NAME_CONF_KEY = "hbase.mapreduce.hfileoutputformat.table.name";

    private static final String BULKLOAD_HCONNECTION_CONF_KEY = "hbase.bulkload.hconnection.configuration";

    @Override
    public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context)
            throws IOException, InterruptedException {
        return createRecordWriter(context, this.getOutputCommitter(context));
    }

    static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(final TaskAttemptContext context,
            final OutputCommitter committer) throws IOException, InterruptedException {

        // Get the path of the temporary output file
        final Path outputdir = ((FileOutputCommitter) committer).getWorkPath();
        final Configuration conf = context.getConfiguration();
        LOG.debug("Task output path: " + outputdir);
        final FileSystem fs = outputdir.getFileSystem(conf);
        // These configs. are from hbase-*.xml
        final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE);
        // Invented config.  Add to hbase-*.xml if other than default compression.
        final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName());
        final Algorithm defaultCompression = AbstractHFileWriter.compressionByName(defaultCompressionStr);
        final boolean compactionExclude = conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
                false);

        // create a map from column family to the compression algorithm
        final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf);
        final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf);
        final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf);

        String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
        final Map<byte[], DataBlockEncoding> datablockEncodingMap = createFamilyDataBlockEncodingMap(conf);
        final DataBlockEncoding overriddenEncoding;
        if (dataBlockEncodingStr != null) {
            overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr);
        } else {
            overriddenEncoding = null;
        }

        final Configuration hConnectionConf = getConfigureHConnection(conf);
        
        return new RecordWriter<ImmutableBytesWritable, V>() {
            // Map of families to writers and how much has been output on the writer.
            private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
            private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
            private final byte[] now = Bytes.toBytes(System.currentTimeMillis());
            private boolean rollRequested = false;

            @Override
            public void write(ImmutableBytesWritable row, V cell) throws IOException {
                KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
                if (row == null && kv == null) {
                    rollWriters();
                    return;
                }
                byte[] rowKey = CellUtil.cloneRow(kv);
                long length = kv.getLength();
                byte[] family = CellUtil.cloneFamily(kv);
                WriterLength wl = this.writers.get(family);
                if (wl == null) {
                    fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
                }
                if (wl != null && wl.written + length >= maxsize) {
                    this.rollRequested = true;
                }
                if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
                    rollWriters();
                }
                if (wl == null || wl.writer == null) {
                    if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
                        HRegionLocation loc = null;
                        String tableName = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
                        if (tableName != null) {
                            try (Connection connection = ConnectionFactory.createConnection(hConnectionConf);
                                    RegionLocator locator = connection.getRegionLocator(TableName.valueOf(tableName))) {
                                loc = locator.getRegionLocation(rowKey);
                            } catch (Throwable e) {
                                LOG.warn("there's something wrong when locating rowkey: " + Bytes.toString(rowKey), e);
                                loc = null;
                            }
                        }

                        if (null == loc) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("failed to get region location, so use default writer: " +
                                        Bytes.toString(rowKey));
                            }
                            wl = getNewWriter(family, conf, null);
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("first rowkey: [" + Bytes.toString(rowKey) + "]");
                            }
                            InetSocketAddress initialIsa =
                                    new InetSocketAddress(loc.getHostname(), loc.getPort());
                            if (initialIsa.isUnresolved()) {
                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("failed to resolve bind address: " + loc.getHostname() + ":"
                                            + loc.getPort() + ", so use default writer");
                                }
                                wl = getNewWriter(family, conf, null);
                            } else {
                                if(LOG.isDebugEnabled()) {
                                    LOG.debug("use favored nodes writer: " + initialIsa.getHostString());
                                }
                                wl = getNewWriter(family, conf, new InetSocketAddress[] { initialIsa });
                            }
                        }
                    } else {
                        wl = getNewWriter(family, conf, null);
                    }
                }
                kv.updateLatestStamp(this.now);
                wl.writer.append(kv);
                wl.written += length;
                this.previousRow = rowKey;
            }

            private void rollWriters() throws IOException {
                for (WriterLength wl : this.writers.values()) {
                    if (wl.writer != null) {
                        LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written));
                        close(wl.writer);
                    }
                    wl.writer = null;
                    wl.written = 0;
                }
                this.rollRequested = false;
            }

            @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important")
            private WriterLength getNewWriter(byte[] family, Configuration conf, InetSocketAddress[] favoredNodes)
                    throws IOException {
                WriterLength wl = new WriterLength();
                Path familydir = new Path(outputdir, Bytes.toString(family));
                Algorithm compression = compressionMap.get(family);
                compression = compression == null ? defaultCompression : compression;
                BloomType bloomType = bloomTypeMap.get(family);
                bloomType = bloomType == null ? BloomType.NONE : bloomType;
                Integer blockSize = blockSizeMap.get(family);
                blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize;
                DataBlockEncoding encoding = overriddenEncoding;
                encoding = encoding == null ? datablockEncodingMap.get(family) : encoding;
                encoding = encoding == null ? DataBlockEncoding.NONE : encoding;
                Configuration tempConf = new Configuration(conf);
                tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
                HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression)
                        .withChecksumType(HStore.getChecksumType(conf))
                        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
                contextBuilder.withDataBlockEncoding(encoding);
                HFileContext hFileContext = contextBuilder.build();

                if (null == favoredNodes) {
                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
                            .withFileContext(hFileContext).build();
                } else {
                    wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
                            .withOutputDir(familydir).withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
                            .withFileContext(hFileContext).withFavoredNodes(favoredNodes).build();
                }

                this.writers.put(family, wl);
                return wl;
            }

            private void close(final StoreFile.Writer w) throws IOException {
                if (w != null) {
                    w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
                    w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString()));
                    w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true));
                    w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude));
                    w.appendTrackedTimestampsToMetadata();
                    w.close();
                }
            }

            @Override
            public void close(TaskAttemptContext c) throws IOException, InterruptedException {
                for (WriterLength wl : this.writers.values()) {
                    close(wl.writer);
                }
            }
        };
    }

    /*
     * Data structure to hold a Writer and amount of data written on it.
     */
    static class WriterLength {
        long written = 0;
        StoreFile.Writer writer = null;
    }

    /**
     * Return the start keys of all of the regions in this table,
     * as a list of ImmutableBytesWritable.
     */
    private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) throws IOException {
        byte[][] byteKeys = table.getStartKeys();
        ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(byteKeys.length);
        for (byte[] byteKey : byteKeys) {
            ret.add(new ImmutableBytesWritable(byteKey));
        }
        return ret;
    }

    /**
     * Write out a {@link SequenceFile} that can be read by
     * {@link TotalOrderPartitioner} that contains the split points in startKeys.
     */
    @SuppressWarnings("deprecation")
    private static void writePartitions(Configuration conf, Path partitionsPath, List<ImmutableBytesWritable> startKeys)
            throws IOException {
        LOG.info("Writing partition information to " + partitionsPath);
        if (startKeys.isEmpty()) {
            throw new IllegalArgumentException("No regions passed");
        }

        // We're generating a list of split points, and we don't ever
        // have keys < the first region (which has an empty start key)
        // so we need to remove it. Otherwise we would end up with an
        // empty reducer with index 0
        TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>(startKeys);

        ImmutableBytesWritable first = sorted.first();
        if (!Arrays.equals(first.get(), HConstants.EMPTY_BYTE_ARRAY)) {
            throw new IllegalArgumentException("First region of table should have empty start key. Instead has: "
                    + Bytes.toStringBinary(first.get()));
        }
        sorted.remove(first);

        // Write the actual file
        FileSystem fs = partitionsPath.getFileSystem(conf);
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class,
                NullWritable.class);

        try {
            for (ImmutableBytesWritable startKey : sorted) {
                writer.append(startKey, NullWritable.get());
            }
        } finally {
            writer.close();
        }
    }

    public static File configureHConnection(Job job, Configuration hConnectionConf, File tempDir) throws IOException {
        File tempFile = new File(tempDir, "HConfiguration-" + System.currentTimeMillis() + ".xml");
        tempFile.deleteOnExit();

        FileOutputStream os = new FileOutputStream(tempFile);
        hConnectionConf.writeXml(os);
        os.close();

        String tmpFiles = job.getConfiguration().get("tmpfiles", null);
        if (tmpFiles == null) {
            tmpFiles = fixWindowsPath("file://" + tempFile.getAbsolutePath());
        } else {
            tmpFiles += "," + fixWindowsPath("file://" + tempFile.getAbsolutePath());
        }
        job.getConfiguration().set("tmpfiles", tmpFiles);
        LOG.info("A temporary file " + tempFile.getAbsolutePath()
                + " is created for storing hconnection related configuration!!!");

        job.getConfiguration().set(BULKLOAD_HCONNECTION_CONF_KEY, tempFile.getName());
        return tempFile;
    }

    public static Configuration getConfigureHConnection(Configuration jobConf) {
        if (Strings.isNullOrEmpty(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY))) {
            return jobConf;
        }
        File tempFile = new File(jobConf.get(BULKLOAD_HCONNECTION_CONF_KEY));
        Configuration hConnectionConf = new Configuration(false);
        hConnectionConf.addResource(new Path(tempFile.toURI()));
        return hConnectionConf;
    }

    public static String fixWindowsPath(String path) {
        // fix windows path
        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
            path = path.replace("file://", "file:///");
        }
        if (path.startsWith("file:///")) {
            path = path.replace('\\', '/');
        }
        return path;
    }

    /**
     * Configure a MapReduce Job to perform an incremental load into the given
     * table. This
     * <ul>
     *   <li>Inspects the table to configure a total order partitioner</li>
     *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
     *   <li>Sets the number of reduce tasks to match the current number of regions</li>
     *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
     *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
     *     PutSortReducer)</li>
     * </ul>
     * The user should be sure to set the map output value class to either KeyValue or Put before
     * running this function.
     *
     * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead.
     */
    @Deprecated
    public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
        configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator());
    }

    /**
     * Configure a MapReduce Job to perform an incremental load into the given
     * table. This
     * <ul>
     *   <li>Inspects the table to configure a total order partitioner</li>
     *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
     *   <li>Sets the number of reduce tasks to match the current number of regions</li>
     *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
     *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
     *     PutSortReducer)</li>
     * </ul>
     * The user should be sure to set the map output value class to either KeyValue or Put before
     * running this function.
     */
    public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException {
        configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
    }

    /**
     * Configure a MapReduce Job to perform an incremental load into the given
     * table. This
     * <ul>
     *   <li>Inspects the table to configure a total order partitioner</li>
     *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
     *   <li>Sets the number of reduce tasks to match the current number of regions</li>
     *   <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li>
     *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
     *     PutSortReducer)</li>
     * </ul>
     * The user should be sure to set the map output value class to either KeyValue or Put before
     * running this function.
     */
    public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator)
            throws IOException {
        configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat3.class);
    }

    static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator,
            Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException {
        Configuration conf = job.getConfiguration();
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);
        job.setOutputFormatClass(cls);

        // Based on the configured map output class, set the correct reducer to properly
        // sort the incoming values.
        // TODO it would be nice to pick one or the other of these formats.
        if (KeyValue.class.equals(job.getMapOutputValueClass())) {
            job.setReducerClass(KeyValueSortReducer.class);
        } else if (Put.class.equals(job.getMapOutputValueClass())) {
            job.setReducerClass(PutSortReducer.class);
        } else if (Text.class.equals(job.getMapOutputValueClass())) {
            job.setReducerClass(TextSortReducer.class);
        } else {
            LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
        }

        conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
                ResultSerialization.class.getName(), KeyValueSerialization.class.getName());

        if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
            // record this table name for creating writer by favored nodes
            LOG.info("bulkload locality sensitive enabled");
            conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
        }
        
        // Use table's region boundaries for TOP split points.
        LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
        List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
        LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
        job.setNumReduceTasks(startKeys.size());

        configurePartitioner(job, startKeys);
        // Set compression algorithms based on column families
        configureCompression(conf, tableDescriptor);
        configureBloomType(tableDescriptor, conf);
        configureBlockSize(tableDescriptor, conf);
        configureDataBlockEncoding(tableDescriptor, conf);

        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.initCredentials(job);
        LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
    }

    public static void configureIncrementalLoadMap(Job job, Table table) throws IOException {
        Configuration conf = job.getConfiguration();

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);
        job.setOutputFormatClass(HFileOutputFormat3.class);

        // Set compression algorithms based on column families
        configureCompression(conf, table.getTableDescriptor());
        configureBloomType(table.getTableDescriptor(), conf);
        configureBlockSize(table.getTableDescriptor(), conf);
        HTableDescriptor tableDescriptor = table.getTableDescriptor();
        configureDataBlockEncoding(tableDescriptor, conf);

        TableMapReduceUtil.addDependencyJars(job);
        TableMapReduceUtil.initCredentials(job);
        LOG.info("Incremental table " + table.getName() + " output configured.");
    }

    /**
     * Runs inside the task to deserialize column family to compression algorithm
     * map from the configuration.
     *
     * @param conf to read the serialized values from
     * @return a map from column family to the configured compression algorithm
     */
    @VisibleForTesting
    static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) {
        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY);
        Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR);
        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
            Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue());
            compressionMap.put(e.getKey(), algorithm);
        }
        return compressionMap;
    }

    /**
     * Runs inside the task to deserialize column family to bloom filter type
     * map from the configuration.
     *
     * @param conf to read the serialized values from
     * @return a map from column family to the the configured bloom filter type
     */
    @VisibleForTesting
    static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) {
        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY);
        Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], BloomType>(Bytes.BYTES_COMPARATOR);
        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
            BloomType bloomType = BloomType.valueOf(e.getValue());
            bloomTypeMap.put(e.getKey(), bloomType);
        }
        return bloomTypeMap;
    }

    /**
     * Runs inside the task to deserialize column family to block size
     * map from the configuration.
     *
     * @param conf to read the serialized values from
     * @return a map from column family to the configured block size
     */
    @VisibleForTesting
    static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY);
        Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
            Integer blockSize = Integer.parseInt(e.getValue());
            blockSizeMap.put(e.getKey(), blockSize);
        }
        return blockSizeMap;
    }

    /**
     * Runs inside the task to deserialize column family to data block encoding
     * type map from the configuration.
     *
     * @param conf to read the serialized values from
     * @return a map from column family to HFileDataBlockEncoder for the
     *         configured data block type for the family
     */
    @VisibleForTesting
    static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) {
        Map<byte[], String> stringMap = createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
        Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
        for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
            encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue())));
        }
        return encoderMap;
    }

    /**
     * Run inside the task to deserialize column family to given conf value map.
     *
     * @param conf to read the serialized values from
     * @param confName conf key to read from the configuration
     * @return a map of column family to the given configuration value
     */
    private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) {
        Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR);
        String confVal = conf.get(confName, "");
        for (String familyConf : confVal.split("&")) {
            String[] familySplit = familyConf.split("=");
            if (familySplit.length != 2) {
                continue;
            }
            try {
                confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8),
                        URLDecoder.decode(familySplit[1], "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                // will not happen with UTF-8 encoding
                throw new AssertionError(e);
            }
        }
        return confValMap;
    }

    /**
     * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
     * <code>splitPoints</code>. Cleans up the partitions file after job exists.
     */
    static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException {
        Configuration conf = job.getConfiguration();
        // create the partitions file
        FileSystem fs = FileSystem.get(conf);
        Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID());
        fs.makeQualified(partitionsPath);
        writePartitions(conf, partitionsPath, splitPoints);
        fs.deleteOnExit(partitionsPath);

        // configure job to use it
        job.setPartitionerClass(TotalOrderPartitioner.class);
        TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
    }

    /**
     * Serialize column family to compression algorithm map to configuration.
     * Invoked while configuring the MR job for incremental load.
     *
     * @param table to read the properties from
     * @param conf to persist serialized values into
     * @throws IOException
     *           on failure to read column family descriptors
     */
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
    @VisibleForTesting
    static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor)
            throws UnsupportedEncodingException {
        StringBuilder compressionConfigValue = new StringBuilder();
        if (tableDescriptor == null) {
            // could happen with mock table instance
            return;
        }
        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
        int i = 0;
        for (HColumnDescriptor familyDescriptor : families) {
            if (i++ > 0) {
                compressionConfigValue.append('&');
            }
            compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
            compressionConfigValue.append('=');
            compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8"));
        }
        // Get rid of the last ampersand
        conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString());
    }

    /**
     * Serialize column family to block size map to configuration.
     * Invoked while configuring the MR job for incremental load.
     * @param tableDescriptor to read the properties from
     * @param conf to persist serialized values into
     *
     * @throws IOException
     *           on failure to read column family descriptors
     */
    @VisibleForTesting
    static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf)
            throws UnsupportedEncodingException {
        StringBuilder blockSizeConfigValue = new StringBuilder();
        if (tableDescriptor == null) {
            // could happen with mock table instance
            return;
        }
        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
        int i = 0;
        for (HColumnDescriptor familyDescriptor : families) {
            if (i++ > 0) {
                blockSizeConfigValue.append('&');
            }
            blockSizeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
            blockSizeConfigValue.append('=');
            blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(familyDescriptor.getBlocksize()), "UTF-8"));
        }
        // Get rid of the last ampersand
        conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString());
    }

    /**
     * Serialize column family to bloom type map to configuration.
     * Invoked while configuring the MR job for incremental load.
     * @param tableDescriptor to read the properties from
     * @param conf to persist serialized values into
     *
     * @throws IOException
     *           on failure to read column family descriptors
     */
    @VisibleForTesting
    static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf)
            throws UnsupportedEncodingException {
        if (tableDescriptor == null) {
            // could happen with mock table instance
            return;
        }
        StringBuilder bloomTypeConfigValue = new StringBuilder();
        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
        int i = 0;
        for (HColumnDescriptor familyDescriptor : families) {
            if (i++ > 0) {
                bloomTypeConfigValue.append('&');
            }
            bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
            bloomTypeConfigValue.append('=');
            String bloomType = familyDescriptor.getBloomFilterType().toString();
            if (bloomType == null) {
                bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
            }
            bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
        }
        conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString());
    }

    /**
     * Serialize column family to data block encoding map to configuration.
     * Invoked while configuring the MR job for incremental load.
     *
     * @param table to read the properties from
     * @param conf to persist serialized values into
     * @throws IOException
     *           on failure to read column family descriptors
     */
    @VisibleForTesting
    static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, Configuration conf)
            throws UnsupportedEncodingException {
        if (tableDescriptor == null) {
            // could happen with mock table instance
            return;
        }
        StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
        Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
        int i = 0;
        for (HColumnDescriptor familyDescriptor : families) {
            if (i++ > 0) {
                dataBlockEncodingConfigValue.append('&');
            }
            dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8"));
            dataBlockEncodingConfigValue.append('=');
            DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
            if (encoding == null) {
                encoding = DataBlockEncoding.NONE;
            }
            dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8"));
        }
        conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString());
    }
}
